<?php


namespace NanQi\Hope\Crontab;


use Carbon\Carbon;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Crontab\Crontab as HyperfCrontab;
use Hyperf\Crontab\PipeMessage;
use Hyperf\Crontab\Strategy\Executor;
use NanQi\Hope\Base\BaseCron;
use NanQi\Hope\Crontab\Batch\BaseBatchCron;
use NanQi\Hope\Crontab\Batch\BatchPipeInterface;
use NanQi\Hope\Helper;
use Swoole\Timer;

class CrontabExecutor extends Executor implements BatchPipeInterface
{
    use Helper;

    /**
     * 是否运行
     * @var bool
     */
    public static $running = true;

    /**
     * 正在运行的集合数量
     */
    public static $runningCount = 0;

    /**
     * 消息处理
     * @param $crontab
     * @param $fromWorkerId
     */
    public function pipe($crontab, $fromWorkerId)
    {
        $this->execute($crontab);
    }

    public function execute(HyperfCrontab $crontab)
    {
        if (! $crontab instanceof Crontab || ! $crontab->getExecuteTime()) {
            return;
        }

        //要关闭状态下，不再运行新的cron任务
        if (!self::$running) {
            return;
        }

        //每次执行添加执行数量
        self::$runningCount++;

        $diff = $crontab->getExecuteTime()->diffInRealSeconds(new Carbon());
        Timer::after($diff > 0 ? $diff * 1000 : 1,
            $this->runOnOneServer($crontab, $this->getRun($crontab)));
    }

    protected function getRun(Crontab $crontab)
    {
        return function () use ($crontab) {
            try {
                [$class, $method] = $crontab->getCallback();
                /** @var BaseCron $instance */
                $instance = di($class);//更改
                if ($class && $method && class_exists($class) && method_exists($class, $method)) {
                    $this->getLog()->info("Crontab[{$instance->getCrontabName()}] START");
                    $instance->{$method}();
                    $this->getLog()->info("Crontab[{$instance->getCrontabName()}] DONE");
                }

                //工作流任务执行
                if ($crontab instanceof Crontab) {
                    $nextList = $crontab->getNextCrontabList();
                    foreach ($nextList as $nextClass) {
                        $instance = di($nextClass);
                        $this->getLog()->info("NextCrontab[{$instance->getCrontabName()}] START");
                        $server = $this->getServer();
                        $workerId = $server->getWorkerId();
                        if ($instance instanceof BaseBatchCron
                            && $workerId != 0) {
                            $server->sendMessage(new PipeMessage(
                                'callback',
                                [$nextClass, 'execute'],
                                $crontab
                            ), 0);
                        } else {
                            $instance->execute();
                        }
                        $this->getLog()->info("NextCrontab[{$instance->getCrontabName()}] DONE");
                    }
                }
            } catch (\Throwable $throwable) {
                $this->getLog()->error("Crontab[{$crontab->getName()}] ERROR:" . $throwable->getMessage()
                    . PHP_EOL . $throwable->getTraceAsString());
            } finally {
                self::$runningCount--;
            }
        };
    }

    /**
     * 停止定时任务
     */
    public function stop()
    {
        self::$running = false;
        $timerId = -1;
        $timerId = Timer::tick(1000, function() use (&$timerId){
            $this->getLog()->debug('count:' . self::$runningCount);
            if (self::$runningCount <= 0) {
                Timer::clear($timerId);
                $this->getLog()->debug('stop start');

                /** @var ConfigInterface $config */
                $config = di(ConfigInterface::class);

                $this->getRedisBusiness()->set(
                    'stop:crontab:' . $config->get('app_name'), '1');
            }
        });
    }
}